package com.example.socket.client;

import com.example.socket.core.Message;
import com.example.socket.core.MessageConstant;
import com.example.socket.core.Response;
import com.example.socket.core.Session;
import com.example.socket.exception.SocketException;
import com.example.socket.handler.AbstractListener;
import com.example.socket.handler.Dispatcher;
import com.example.socket.handler.NettyHandler;
import com.example.socket.handler.SnGenerator;
import com.example.socket.server.SessionConfig;
import com.example.socket.thread.DelayedElement;
import com.example.socket.thread.NamedThreadFactory;
import com.example.socket.utils.JsonUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.*;
import java.net.InetSocketAddress;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 客户端工厂对象
 */
@SuppressWarnings("rawtypes")
public class ClientFactory {

    private static final Logger logger = LoggerFactory.getLogger(ClientFactory.class);

    /** 默认连接地址 */
    private String defaultAddress;
    /** 全局会话配置 */
    private SessionConfig sessionConfig;
    /** 客户端过期时间(秒) */
    private int removeTimes;
    /**失败消息存储文件路径*/
    private String messageFile;
    /**客户端业务处理核心线程*/
    private int clientThreadsCore;
    /**客户端io线程*/
    private int clientIoCore;
    /** 与 {@link }的连接 */
    private volatile Bootstrap connector;
    /** 会话的过滤器 */
    private Map<String, ChannelHandler> filters;
    /** 序列号生成器(全部客户端共用) */
    private SnGenerator generator = new SnGenerator();
    /** 客户端操作锁 */
    private ConcurrentMap<String, Lock> locks = new ConcurrentHashMap<>();

    /** tcp */
    protected ConcurrentMap<String, AbstractClient<?>> tcpClients = new ConcurrentHashMap<>();
    /** kcp */
    protected ConcurrentMap<String, AbstractClient<?>> kcpClients = new ConcurrentHashMap<>();

    protected volatile boolean opened = true;
    /** 回调集合 */
    protected ConcurrentHashMap<Integer, SocketFuture> futures = new ConcurrentHashMap<>();
    /**心跳数据包*/
    private Message heartBeat;
    /**心跳包间隔时间*/
    private int heartBeatInterval;
    private ExecutorService executor;
    /** 回应超时时间(毫秒) */
    int responseTimeout;
    /**消息重发时间单位毫秒*/
    int resendTime;
    int maxRetry;
    /** 过期客户端以及待重发的消息队列 */
    DelayQueue<DelayedElement<?>> delays = new DelayQueue<>();
    /** 指令注册器(作为所有客户端的私有指令注册器的种子) */
    Dispatcher dispatcher;
    /**BOSS group*/
    private EventLoopGroup group;

    public ClientFactory(ClientConfig config, Dispatcher dispatcher, Map<String, ChannelHandler> filters) {
        SessionConfig sessionConfig = config.getSessionConfig();
        this.sessionConfig = sessionConfig;
        this.dispatcher = dispatcher;
        this.filters = filters;
        this.defaultAddress = config.getDefaultAddress();
        this.removeTimes = config.getRemoveTimes();
        this.responseTimeout = config.getResponseTimeout();
        this.resendTime = config.getResendTime() * 1000;
        this.messageFile = config.getMessageFile();
        this.heartBeatInterval = config.getHeartBeatInterval();
        this.heartBeat = config.getHeartBeat();
        this.clientThreadsCore = config.getClientThreadsCore();
        this.clientIoCore = config.getClientIoCore();
        this.maxRetry = config.getMaxRetry();
    }

    private ClientFactory() {
    }


    // 客户端工厂自身的生命周期方法

    /** 初始化方法 */
    @PostConstruct
    public void initialize() {
        // 初始化
        bootstrap0();
    }

    /**
     *  创建与配置连接器
     */
    private void bootstrap0() {
        connector = new Bootstrap();
        // 设置会话配置
        Map<ChannelOption<?>, ?> sessionOptions = sessionConfig.buildClient();
        for (Entry<?, ?> e : sessionOptions.entrySet()) {
            ChannelOption<Object> key = (ChannelOption<Object>) e.getKey();
            Object value = e.getValue();
            connector.option(key, value);
        }
        group = new NioEventLoopGroup(clientIoCore, new NamedThreadFactory("客户端NIO线程"));
        connector.group(group);
        connector.channel(NioSocketChannel.class);
        ThreadGroup threadGroup = new ThreadGroup("客户端通信模块");
        executor = new NioEventLoopGroup(clientThreadsCore, new NamedThreadFactory(threadGroup, "业务线程"));
        //添加一个监听器
        dispatcher.addListener(new ClientListener(futures));
        // 创建控制器与配置
        NettyHandler handler = new ClientHandler(dispatcher, executor);
        connector.handler(new ClientHandlerInitializer(handler, filters));
        /**重发消息文件*/
        File file = new File(messageFile);
        if (!file.exists()) {
            try {
                file.createNewFile();
            } catch (IOException e) {
                logger.error("client端消息文件创建失败");
                logger.error(file.getAbsolutePath());
                throw new RuntimeException(e);
            }
        }
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))) {
            String line = reader.readLine();
            while (StringUtils.isNotBlank(line)) {
                MessageResend messageResend = JsonUtils.string2Object(line, MessageResend.class);
                delays.add(messageResend);
                line = reader.readLine();
            }
        } catch (Exception e) {
            logger.error("client端消息文件读取失败");
            throw new RuntimeException(e);
        }
        // 启动客户端工厂守护线程
        Thread removeClientThread = new Thread("客户端工厂守护线程") {
            @Override
            public void run() {
                while (!Thread.interrupted() && opened) {
                    try {
                        DelayedElement<?> e = delays.take();
                        Object content = e.getContent();
                        if (content instanceof AbstractClient<?>) {
                            AbstractClient<?> client = (AbstractClient<?>) content;
                            boolean delay = checkRemove(client);
                            if (delay) {
                                // 延迟再检查
                                submitRemove(client);
                            }
                        } else if (content instanceof Integer) {
                            Integer sn = (Integer) content;
                            SocketFuture future = futures.get(sn);
                            if (future != null) {
                                future.timeOut();
                            }
                        } else if (e instanceof MessageResend) {
                            MessageResend resend = (MessageResend) e;
                            Message message = resend.getContent();
                            String address = resend.getAddress();
                            AbstractClient<?> client = getClient(resend.getClientType(), address);
                            if (client == null) {
                                e.setEnd(new Date(System.currentTimeMillis() + resendTime));
                                delays.put(e);
                                continue;
                            }
                            client.send(message, true);
                        } else if (e instanceof HeartBeatPacket) {
                            HeartBeatPacket packet = (HeartBeatPacket) e;
                            String address = packet.getContent();
                            AbstractClient<?> client = getClient(packet.getClientType(), address);
                            if (client == null) {
                                continue;
                            }
                            heartBeat.setSn(generator.next());
                            client.send(heartBeat, false);
                            Date end = new Date(System.currentTimeMillis() + heartBeatInterval);
                            packet = new HeartBeatPacket(client.type, address, end);
                            if (client.isSendHeartBeat()) {
                                delays.add(packet);
                            }
                        }

                    } catch (InterruptedException e) {
                        logger.error("客户端工厂守护线程被打断", e);
                    } catch (Exception e) {
                        logger.error("客户端工厂守护线程出现未知异常", e);
                    }
                }
            }

        };
        removeClientThread.setDaemon(true);
        removeClientThread.start();
    }

    public static class ClientListener extends AbstractListener {
        private ConcurrentHashMap<Integer, SocketFuture> futures;

        public ClientListener(ConcurrentHashMap<Integer, SocketFuture> futures) {
            this.futures = futures;
        }

        @Override
        public void sent(Message message, Session session) {
            int sn = message.getSn();
            if (logger.isDebugEnabled()) {
                SocketFuture<Response> entry = futures.get(sn);
                if (entry == null) {
                    logger.debug("===消息[{}]发送无回调对象===", sn);
                } else {
                    logger.debug("===消息[{}]已发送===", sn);
                }
            }
        }

        @Override
        public void received(Message message, Session session) {
            int sn = message.getSn();
            if (!message.isResponse()) {
                // 不是回应消息
                if (logger.isDebugEnabled()) {
                    logger.debug("===消息[{}]请求, 不是回应忽略===", sn);
                }
                return;
            }
            SocketFuture<Response> entry = futures.get(sn);
            if (entry == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("===消息[{}]回应, 无请求忽略===", sn);
                }
                return;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("===消息[{}]回应, 等待处理中===", sn);
            }
            if (message.hasState(MessageConstant.STATE_ERROR) || message.hasState(MessageConstant.IDENTITY_EXCEPTION)
                    || message.hasState(MessageConstant.TARGET_SERVER_NOT_FOUND) || message.hasState(MessageConstant.UNKNOWN_EXCEPTION)) {
                entry.onError(new RuntimeException("消息发送发生错误，消息状态 : " + message.getState()));
                return;
            }
            Response response;
            try {
                entry.getClient().setSessionId(message.getSession());
                response = getDispatcher().decodeResponse(message);
            } catch (Exception e) {
                // 解码异常
                entry.onError(e);
                return;
            }
            entry.onSuccess(response);
            if (logger.isDebugEnabled()) {
                logger.debug("---消息[{}]已处理---", sn);
            }
        }
    }

    /** 销毁方法 */
    @PreDestroy
    public void destory() {
        opened = false;
        for (Client client : tcpClients.values()) {
            client.close();
        }
        for (Client client : kcpClients.values()) {
            client.close();
        }
        tcpClients.clear();
        kcpClients.clear();
        saveMessages();
    }

    private void saveMessages() {
        try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(messageFile)))) {
            for (DelayedElement<?> de : delays) {
                if (de instanceof MessageResend) {
                    String line = JsonUtils.object2String(de);
                    writer.write(line);
                    writer.newLine();
                }
            }
        } catch (Exception e) {
            logger.error("client端消息文件写入失败, ,消息内容:[{}],内容中可能包含client对象", JsonUtils.object2String(delays));
        }
    }

    /**
     * 提交移除非活跃客户端
     */
    void submitRemove(Client client) {
        // 提交超时任务
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, removeTimes);
        DelayedElement<Client> element = DelayedElement.valueOf(client, calendar.getTime());
        delays.put(element);
    }

    /**
     * 移除非活跃客户端
     * @return 是否延迟再检查
     */
    private boolean checkRemove(AbstractClient<?> client) {
        if (client == null || client.channel == null) {
            // 客户端已经销毁
            return false;
        }

        if (client.isKeepAlive()) {
            // 长连接
            return false;
        }

        if (!client.isIdle()) {
            // 有消息等待回应, 延时再进行检查
            return true;
        }
        String address = client.getAddress();
        // 检查是否超时没有活动
        Calendar calendar = Calendar.getInstance();
        calendar.setTimeInMillis(client.getTimestamp());
        calendar.add(Calendar.SECOND, removeTimes);
        Date now = new Date();
        Date check = calendar.getTime();
        if (check.after(now)) {
            // 客户端还处于活跃状态，延时再进行检查
            if (logger.isDebugEnabled()) {
                logger.debug("客户端活跃失效时间[{}]未到期, 等待下次检测...", check);
            }
            return true;
        }
        if (logger.isWarnEnabled()) {
            logger.warn("关闭空闲超时客户端连接 - {}", client.getAddress());
        }
        Lock lock = loadClientLock(address);
        lock.lock();
        try {
            // 移除缓存
            removeClient(client.type, address);
        } finally {
            lock.unlock();
        }
        client.close();
        return false;
    }

    // 通信客户端管理方法

    /**
     * 直接创建一个不受客户端工厂管理的客户端
     * @param type 类型 Client#TCP or Client#KCP
     * @param address 服务器地址加端口(ip:port)
     * @return
     */
    public AbstractClient<?> createClient(int type, String address) {
        InetSocketAddress socketAddress = toInetSocketAddress(address);
        switch (type) {
            case Client.KCP:
                //return new KcpClient(dispatcher, socketAddress, group, executor, this, false, generator, futures, delays, false);
            default:
                return new SimpleClient(dispatcher, socketAddress, connector, this, false, generator, futures, delays, false);
        }
    }

    /**
     * 直接创建一个不受客户端工厂管理的tcp客户端
     * @param address 服务器地址加端口(ip:port)
     * @return
     */
    public AbstractClient<?> createClient(String address) {
        return createClient(Client.TCP, address);
    }

    /**
     * 直接创建默认地址一个不受客户端工厂管理的tcp客户端
     * @param address 服务器地址加端口(ip:port)
     * @return
     */
    public AbstractClient<?> createClient() {
        if (defaultAddress == null) {
            throw new IllegalStateException("默认客户端连接服务器的配置缺失");
        }
        return createClient(Client.TCP, defaultAddress);
    }

    /**
     * 获取默认tcp客户端
     * @param keepAlive 是否维持客户端的生命
     * @return
     * @throws IllegalStateException 默认客户端连接服务器的配置缺失时抛出
     */
    public Client getClient(boolean keepAlive) {
        if (defaultAddress == null) {
            throw new IllegalStateException("默认客户端连接服务器的配置缺失");
        }
        return getClient(defaultAddress, keepAlive);
    }

    /**
     * 获取指定地址对应的通信tcp客户端
     * @param address 服务器地址加端口(ip:port)
     * @param keepAlive 是否维持客户端的生命
     * @return
     */
    public Client getClient(String address, boolean keepAlive) {
        return getClient(address, keepAlive, false);
    }

    /**
     * 获取指定地址对应的通信客户端
     * @param address 服务器地址加端口(ip:port)
     * @param keepAlive 是否维持客户端的生命
     * @return
     */
    public Client getClient(int type, String address, boolean keepAlive) {
        return getClient(type, address, keepAlive, false);
    }

    public Client getClient(String address, boolean keepAlive, boolean sendHeartBeat) {
        return getClient(Client.TCP, address, keepAlive, sendHeartBeat);
    }

    public Client getClient(int type, String address, boolean keepAlive, boolean sendHeartBeat) {
        Lock lock = loadClientLock(address);
        lock.lock();
        try {
            AbstractClient<?> client = getClient(type, address);
            if (client != null) {
                client.timestamp = System.currentTimeMillis();
                if (client.keepAlive != keepAlive) {
                    client.keepAlive = keepAlive;
                }
            } else {
                client = createClient(type, address);
                AbstractClient<?> prev = putClient(address, client);
                if (prev != null && prev != client) {
                    client = prev;
                } else {
                    client.keepAlive = keepAlive;
                }
            }
            if (!client.sendHeartBeat && sendHeartBeat && keepAlive && heartBeatInterval > 0) {
                Date end = new Date(System.currentTimeMillis() + this.heartBeatInterval);
                client.sendHeartBeat = sendHeartBeat;
                HeartBeatPacket packet = new HeartBeatPacket(client.type, address, end);
                this.delays.add(packet);
            }
            return client;
        } catch (Exception e) {
            removeClientLock(address);
            // 异常处理
            FormattingTuple message = MessageFormatter.format("无法获取指定服务器地址的客户端对象[{}]", address, e);
            logger.error(message.getMessage());
            throw new SocketException(message.getMessage(), e);
        } finally {
            lock.unlock();
        }
    }

    public Dispatcher getDispatcher() {
        return dispatcher;
    }

    // 内部方法

    /**
     * 获取客户端操作锁
     * @param address 服务器地址
     * @return 不会返回null
     */
    private Lock loadClientLock(String address) {
        Lock result = locks.get(address);
        if (result != null) {
            return result;
        }

        result = new ReentrantLock();
        Lock prev = locks.putIfAbsent(address, result);
        return prev == null ? result : prev;
    }

    private AbstractClient<?> getClient(int type, String address) {
        if (type == Client.KCP) {
            return kcpClients.get(address);
        }
        return tcpClients.get(address);
    }

    private AbstractClient<?> putClient(String address, AbstractClient<?> client) {
        if (client.type == Client.KCP) {
            return kcpClients.putIfAbsent(address, client);
        } else {
            return tcpClients.putIfAbsent(address, client);
        }
    }

    private void removeClient(int type, String address) {
        if (type == Client.KCP) {
            kcpClients.remove(address);
        } else {
            tcpClients.remove(address);
        }
    }

    /**
     * 移除客户端操作锁
     * @param address 服务器地址
     */
    private void removeClientLock(String address) {
        locks.remove(address);
    }

    /** 连接地址 */
    private InetSocketAddress toInetSocketAddress(String text) {
        if (StringUtils.isEmpty(text)) {
            throw new IllegalArgumentException("无效的地址字符串: " + text);
        }

        int colonIndex = text.lastIndexOf(":");
        if (colonIndex > 0) {
            String host = text.substring(0, colonIndex);
            if (!"*".equals(host)) {
                int port = parsePort(text.substring(colonIndex + 1));
                return new InetSocketAddress(host, port);
            }
        }

        int port = parsePort(text.substring(colonIndex + 1));
        return new InetSocketAddress(port);
    }

    /** 获取端口值 */
    private int parsePort(String s) {
        try {
            return Integer.parseInt(s);
        } catch (NumberFormatException nfe) {
            throw new IllegalArgumentException("无效的端口值: " + s);
        }
    }

    public int getMaxRetry() {
        return maxRetry;
    }
    // Getter and Setter ...

    public void setFilters(Map<String, ChannelHandler> filters) {
        this.filters = filters;
    }

}